package com.atguigu.flink.sql.function;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;

/**
 * Created by Smexy on 2023/11/20


 */
public class Demo3_AggFunctionCustom
{
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String createTableSql = "CREATE TABLE t1 (" +
            "  id STRING," +
            "  ts BIGINT," +
            "  vc INT " +
            ")  WITH (" +
            "  'connector' = 'filesystem',   " +
            "  'path' = 'data/ws.json', " +
            "  'format' = 'json'             " +
            ")";

        tableEnv.executeSql(createTableSql);

        //创建函数对象
        MyAvg myFunction = new MyAvg();
        //为函数对象起个名字
        tableEnv.createTemporaryFunction("avgVc",myFunction);
        //SQL
        tableEnv.sqlQuery("select id,avgVc(vc) from t1 group by id")
                .execute()
                .print();

    }

    /*
        示例：求每一种传感器的平均水位
        继承 AggregateFunction<T, ACC> ，导入table包下
            T：是最终的结果
            ACC: 聚合过程中使用的累加器的类型
     */
    public static class MyAvg extends AggregateFunction<Double,MyAcc >
    {

        //计算函数输出
        @Override
        public Double getValue(MyAcc accumulator) {
            return accumulator.sumVc / accumulator.count;
        }

        //创建累加器
        @Override
        public MyAcc createAccumulator() {
            return new MyAcc(0d,0);
        }

        /*
            累加的方法(累加器,输入的数据)
                方法输入的数据的列数，取决于函数的调用。
                    myAvg(a,b,c), 方法的参数列表就可以按照顺序写3列

         */
        public void accumulate(MyAcc acc, Integer vc) {
           acc.count = acc.count + 1;
           acc.sumVc = acc.sumVc + vc;
        }
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class MyAcc{
        private Double sumVc;
        private Integer count;
    }
}
